Kafka REST Proxy快速入门

Kafka REST Proxy是Confluent的一个组件,它提供了访问Kafka集群的RESTful接口。对于一些暂不支持Kafka的平台或编程语言来说,可以使用Kafka REST Proxy来发送和消费消息,查看集群状态和执行集群管理命令。

安装

启动

  • cd confluent-<version>/

  • 配置

    • 修改etc/kafka-rest/kafka-rest.properties的zookeeper.connect为Kafka的zookeeper地址
    • 因为我的应用并不需要发送/消费Avro格式的消息,也就不需要Schema Registry,所以将etc/kafka-rest/kafka-rest.properties的schema.registry.url配置项注释掉
  • 启动,bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties

发送JSON消息


1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}


发送的消息体必须是key为records的JSON,每个record可以有key和value属性,其中value是必须的

消费JSON消息

创建Consumer实例

1
2
3
4
5
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "smallest"}' \
http://localhost:8082/consumers/my_consumer_group
{"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_consumer_group/instances/my_consumer_instance"}

使用已创建的Consumer实例消费消息

1
2
$ curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \     http://localhost:8082/consumers/my_consumer_group/instances/my_consumer_instance/topics/jsontest
[{"key":null,"value":{"foo":"bar"},"partition":0,"offset":0}]

删除Consumer实例

1
curl -X DELETE \     http://localhost:8082/consumers/my_consumer_group/instances/my_consumer_instance